﻿using Cyss.Core.Cache;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Cyss.Core.RabbitMQ
{
    public class RabbitMQService
    {
        private IRabbitMQPersistentConnection _mQPersistentConnection;
        IModel channel = null;
        private IStaticCacheManager _staticCacheManager;

        public RabbitMQService(IRabbitMQPersistentConnection mQPersistentConnection, IStaticCacheManager staticCacheManager)
        {
            _mQPersistentConnection = mQPersistentConnection;
            _staticCacheManager = staticCacheManager;
            channel = _mQPersistentConnection.CreateModel();
        }


        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="routingKey">路由key</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="content">消息内容</param>
        /// <param name="exchangeType">交换机类型，默认：ExchangeType.Direct</param>
        public  void Send<T>(string queueName, T content, string TransactionId, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct)
        {
            if (content == null) return;
            MessageModel message = new MessageModel();
            message.TransactionId = TransactionId;
            message.Content = content;
            Send(queueName, message);

            var transactionCacheCollections = _staticCacheManager.Get<TransactionCacheCollections>(TransactionId, () => { var sf = string.Empty; throw new Exception($"缓存中没有找到事务:Id{TransactionId}"); });
            transactionCacheCollections.Add(queueName);
            _staticCacheManager.Set(TransactionId, JsonHelper.SerializeObject(transactionCacheCollections), 30);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="routingKey">路由key</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="content">消息内容</param>
        /// <param name="exchangeType">交换机类型，默认：ExchangeType.Direct</param>
        public void Send<T>(string queueName, T content, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct)
        {
            if (content == null) return;
            Send(queueName, content.ToSerializeObject(), exchangeName, routingKey, exchangeType);
        }


        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="routingKey">路由key</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="content">消息内容</param>
        /// <param name="exchangeType">交换机类型，默认：ExchangeType.Direct</param>
        public void Send(string queueName, string content, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct)
        {
            if (string.IsNullOrEmpty(content)) return;
            channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.QueueDeclare(queueName, true, false, false, null);
            channel.QueueBind(queueName, exchangeName, routingKey, null);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 1;
            byte[] body = Encoding.UTF8.GetBytes(content);
            //开始发送
            //channel.ConfirmSelect();//开启发送消息确认
            channel.BasicPublish(exchangeName, routingKey, properties, body);
            //if(channel.WaitForConfirms())
            //{
            //    //发送消息成功
            //}
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="routingKey">路由key</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="contents">消息内容集合</param>
        /// <param name="exchangeType">交换机类型，默认：ExchangeType.Direct</param>
        public void Send(string queueName, List<string> contents, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct)
        {
            if (contents == null || contents.Count == 0) return;
            channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
            channel.QueueDeclare(queueName, true, false, false, null);
            channel.QueueBind(queueName, exchangeName, routingKey, null);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 1;
            foreach (var content in contents)
            {
                byte[] body = Encoding.UTF8.GetBytes(content);
                //开始发送
                channel.BasicPublish(exchangeName, routingKey, properties, body);
            }
        }

        public void CreateConsumer<T>(string queueName, Action<T> callback, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct, Action errorCallback = null)
        {
            Task.Run(() =>
            {
                channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
                channel.QueueDeclare(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    ReadOnlyMemory<byte> body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    try
                    {
                        callback(JsonHelper.DeserializeObject<T>(message));
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    catch (Exception ex)
                    {
                        //multiple true 批量签收 提高性能
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        errorCallback?.Invoke();
                        return;
                    }
                };
                channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
            });
        }

        /// <summary>
        /// 创建消费者接收消息
        /// </summary>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="routingKey">路由key</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="callback">回调函数</param>
        /// <param name="exchangeType">交换机类型，默认：ExchangeType.Direct</param>
        /// <param name="errorCallback">错误回调函数</param>
        public void CreateConsumer(string queueName, Action<string> callback, string exchangeName = RabbitMQOptions.ExchangeName, string routingKey = RabbitMQOptions.ExchangeName, string exchangeType = ExchangeType.Direct, Action errorCallback = null)
        {
            Task.Run(() =>
            {
                channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
                channel.QueueDeclare(queueName, true, false, false, null);
                channel.QueueBind(queueName, exchangeName, routingKey, null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    ReadOnlyMemory<byte> body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    try
                    {
                        callback(message);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    catch (Exception ex)
                    {
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        errorCallback?.Invoke();
                        return;
                    }
                };
                channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
            });
        }
    }
}
